{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**_pySpark Basics: Loading, Exploring and Saving Data_**"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "_by Jeff Levy (jlevy@urban.org)_ & Alex Engler (aengler@urban.org)\n",
    "\n",
    "_Last Updated: 31 Jul 2017, Spark v2.1_"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "_Abstract: This guide will go over loading a CSV file into a dataframe, exploring it with basic commands, and finally writing it out to file in S3 and reading it back in._\n",
    "\n",
    "_Main operations used:_ `sc`, `spark`, `write.csv`, `read.csv`, `select`, `count`, `dtypes`, `schema`/`inferSchema`, `take`, `show`, `withColumnRenamed`, `columns`, `describe`, `coalesce`"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "***"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# The Spark Context\n",
    "\n",
    "An initial note about how pySpark interacts with Jupyter notebooks: **two global variables are created in the startup process.**  The first is the *spark context*:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "<pyspark.context.SparkContext object at 0x7f12ae6bfe50>"
      ]
     },
     "execution_count": 1,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "sc"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "And the second is the *spark session*:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "<pyspark.sql.session.SparkSession object at 0x7f12ae649ad0>"
      ]
     },
     "execution_count": 2,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "spark"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "They provide access to many of the underlying structures used by pySpark, and you may see them referred to in code throughout the tutorials alongside functions imported from pyspark.sql."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Loading From CSV\n",
    "\n",
    "We can load our data from a CSV file in an S3 bucket.  There are three ways to handle data types (dtypes) for each column: The easiest, but the most computationally-expensive, is to pass `inferSchema=True` to the load method.  The second way entails specifiying the dtypes manually for every column by passing `schema=StructType(...)`, which is computationally-efficient but may be difficult and prone to coder error for especially wide datasets.  The final option is to not specify a schema option at all, in which case Spark will assign all the columns string dtypes.  Note that dtypes can be changed later, as we will demonstrate, though it is more costly than doing it correctly in the loading process.\n",
    "\n",
    "Loading the data with the schema inferred:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "df = spark.read.csv('s3://ui-spark-social-science-public/data/Performance_2015Q1.txt', header=False, inferSchema=True, sep='|')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Example loading of the same data by passing a custom schema:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "\n",
       "from pyspark.sql.types import DateType, TimestampType, IntegerType, FloatType, LongType, DoubleType\n",
       "from pyspark.sql.types import StructType, StructField\n",
       "custom_schema = StructType([StructField('_c0', DateType(), True),\n",
       "                           StructField('_c1', StringType(), True),\n",
       "                           StructField('_c2', DoubleType(), True),\n",
       "                           StructField('_c3', DoubleType(), True),\n",
       "                           StructField('_c4', DoubleType(), True),\n",
       "                           StructField('_c5', IntegerType(), True),\n",
       "                           ...\n",
       "                           StructField('_c27', StringType(), True)])\n",
       "df = spark.read.csv('s3://ui-spark-social-science-public/data/Performance_2015Q1.txt', header=False, schema=custom_schema, sep='|')\n"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "\"\"\"\n",
    "from pyspark.sql.types import DateType, TimestampType, IntegerType, FloatType, LongType, DoubleType\n",
    "from pyspark.sql.types import StructType, StructField\n",
    "\n",
    "custom_schema = StructType([StructField('_c0', DateType(), True),\n",
    "                           StructField('_c1', StringType(), True),\n",
    "                           StructField('_c2', DoubleType(), True),\n",
    "                           StructField('_c3', DoubleType(), True),\n",
    "                           StructField('_c4', DoubleType(), True),\n",
    "                           StructField('_c5', IntegerType(), True),\n",
    "                           ...\n",
    "                           StructField('_c27', StringType(), True)])\n",
    "                           \n",
    "df = spark.read.csv('s3://ui-spark-social-science-public/data/Performance_2015Q1.txt', header=False, schema=custom_schema, sep='|')\n",
    "\"\"\";"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "One example of using infering and specifying a schema together might be with a large, unfamiliar dataset that you know you will need to load up and work with repeatedly.  The first time you load it use `inferSchema`, then make note of the dtypes it assigns.  Use that information to build the custom schema, so that when you load the data in the future you avoid the extra processing time necessary for infering.\n",
    "\n",
    "# Exploring the Data\n",
    "\n",
    "Our data is now loaded into a dataframe that we named `df`, with all the dtypes inferred.  First we'll count the number of rows it found:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "3526154"
      ]
     },
     "execution_count": 5,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.count()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Then we look at the column-by-column dtypes the system estimated:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[('_c0', 'bigint'), ('_c1', 'string'), ('_c2', 'string'), ('_c3', 'double'), ('_c4', 'double'), ('_c5', 'int'), ('_c6', 'int'), ('_c7', 'int'), ('_c8', 'string'), ('_c9', 'int'), ('_c10', 'string'), ('_c11', 'string'), ('_c12', 'int'), ('_c13', 'string'), ('_c14', 'string'), ('_c15', 'string'), ('_c16', 'string'), ('_c17', 'string'), ('_c18', 'string'), ('_c19', 'string'), ('_c20', 'string'), ('_c21', 'string'), ('_c22', 'string'), ('_c23', 'string'), ('_c24', 'string'), ('_c25', 'string'), ('_c26', 'int'), ('_c27', 'string')]"
      ]
     },
     "execution_count": 6,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.dtypes"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "For each pairing (a `tuple` object in Python, denoted by the parentheses), the first entry is the column name and the second is the dtype.  Notice that this data has no headers with it (we specified `headers=False` when we loaded it), so Spark used its default naming convention of `_c0, _c1, ... _cn`.  We'll makes some changes to that in a minute.\n",
    "\n",
    "Take a peak at five rows:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(_c0=100002091588, _c1=u'01/01/2015', _c2=u'OTHER', _c3=4.125, _c4=None, _c5=0, _c6=360, _c7=360, _c8=u'01/2045', _c9=16740, _c10=u'0', _c11=u'N', _c12=None, _c13=None, _c14=None, _c15=None, _c16=None, _c17=None, _c18=None, _c19=None, _c20=None, _c21=None, _c22=None, _c23=None, _c24=None, _c25=None, _c26=None, _c27=None), Row(_c0=100002091588, _c1=u'02/01/2015', _c2=None, _c3=4.125, _c4=None, _c5=1, _c6=359, _c7=359, _c8=u'01/2045', _c9=16740, _c10=u'0', _c11=u'N', _c12=None, _c13=None, _c14=None, _c15=None, _c16=None, _c17=None, _c18=None, _c19=None, _c20=None, _c21=None, _c22=None, _c23=None, _c24=None, _c25=None, _c26=None, _c27=None), Row(_c0=100002091588, _c1=u'03/01/2015', _c2=None, _c3=4.125, _c4=None, _c5=2, _c6=358, _c7=358, _c8=u'01/2045', _c9=16740, _c10=u'0', _c11=u'N', _c12=None, _c13=None, _c14=None, _c15=None, _c16=None, _c17=None, _c18=None, _c19=None, _c20=None, _c21=None, _c22=None, _c23=None, _c24=None, _c25=None, _c26=None, _c27=None), Row(_c0=100002091588, _c1=u'04/01/2015', _c2=None, _c3=4.125, _c4=None, _c5=3, _c6=357, _c7=357, _c8=u'01/2045', _c9=16740, _c10=u'0', _c11=u'N', _c12=None, _c13=None, _c14=None, _c15=None, _c16=None, _c17=None, _c18=None, _c19=None, _c20=None, _c21=None, _c22=None, _c23=None, _c24=None, _c25=None, _c26=None, _c27=None), Row(_c0=100002091588, _c1=u'05/01/2015', _c2=None, _c3=4.125, _c4=None, _c5=4, _c6=356, _c7=356, _c8=u'01/2045', _c9=16740, _c10=u'0', _c11=u'N', _c12=None, _c13=None, _c14=None, _c15=None, _c16=None, _c17=None, _c18=None, _c19=None, _c20=None, _c21=None, _c22=None, _c23=None, _c24=None, _c25=None, _c26=None, _c27=None)]"
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.take(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "In the format `column_name=value` for each row.  Note that the formatting above is ugly because `take` doesn't try to make it pretty, it just returns the row object itself.  We can use `show` instead and that attempts to format the data better, but because there are so many columns in this case the formatting of `show` doesn't fit, and each line wraps down to the next.  We'll use `show` on a subset below.\n",
    "\n",
    "# Selecting & Renaming Columns"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The select operation performs a by column subset of an existing DF. The columns to be returned in the new DF are specified as a list of column name strings in the select operation. Here, we create a new DF called perf_lim that includes only the first 14 columns in the perf DF, i.e. the DF perf_lim is a subset of perf:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(_c0=100002091588, _c1=u'01/01/2015', _c2=u'OTHER', _c3=4.125, _c4=None, _c5=0, _c6=360, _c7=360, _c8=u'01/2045', _c9=16740, _c10=u'0', _c11=u'N', _c12=None, _c13=None)]"
      ]
     },
     "execution_count": 8,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df_lim = df.select('_c0','_c1','_c2', '_c3', '_c4', '_c5', '_c6', '_c7', '_c8', '_c9', '_c10', '_c11', '_c12', '_c13')\n",
    "df_lim.take(1)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We can rename columns one at a time, or several at a time:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "DataFrame[loan_id: bigint, period: string, _c2: string, _c3: double, _c4: double, _c5: int, _c6: int, _c7: int, _c8: string, _c9: int, _c10: string, _c11: string, _c12: int, _c13: string]"
      ]
     },
     "execution_count": 9,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df_lim = df_lim.withColumnRenamed('_c0','loan_id').withColumnRenamed('_c1','period')\n",
    "df_lim"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": true
   },
   "source": [
    "You can see that column `C0` has been renamed to `loan_id`, and `C1` to `period`.\n",
    "\n",
    "We can also rename many of them in a loop using two lists or a dictionary:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "old_names = ['_c2', '_c3', '_c4', '_c5', '_c6', '_c7', '_c8', '_c9', '_c10', '_c11', '_c12', '_c13']\n",
    "new_names = ['servicer_name', 'new_int_rt', 'act_endg_upb', 'loan_age', 'mths_remng', 'aj_mths_remng', 'dt_matr', 'cd_msa', 'delq_sts', 'flag_mod', 'cd_zero_bal', 'dt_zero_bal']\n",
    "for old, new in zip(old_names, new_names):\n",
    "    df_lim = df_lim.withColumnRenamed(old, new)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(loan_id=100002091588, period=u'01/01/2015', servicer_name=u'OTHER', new_int_rt=4.125, act_endg_upb=None, loan_age=0, mths_remng=360, aj_mths_remng=360, dt_matr=u'01/2045', cd_msa=16740, delq_sts=u'0', flag_mod=u'N', cd_zero_bal=None, dt_zero_bal=None)]"
      ]
     },
     "execution_count": 11,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df_lim.take(1)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "['loan_id', 'period', 'servicer_name', 'new_int_rt', 'act_endg_upb', 'loan_age', 'mths_remng', 'aj_mths_remng', 'dt_matr', 'cd_msa', 'delq_sts', 'flag_mod', 'cd_zero_bal', 'dt_zero_bal']"
      ]
     },
     "execution_count": 12,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df_lim.columns"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Describe\n",
    "\n",
    "Now we'll describe the data.  Note that `describe` returns a new dataframe with the information, and so must have `show` called after it if our goal is to view it (note the nice formatting in this case).  This can be called on one or more specific columns, as we do here, or the entire dataframe by passing no columns to describe:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {
    "scrolled": true
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+--------------------+-------------------+------------------+\n",
      "|summary|       servicer_name|         new_int_rt|          loan_age|\n",
      "+-------+--------------------+-------------------+------------------+\n",
      "|  count|              382039|            3526154|           3526154|\n",
      "|   mean|                null|  4.178168090219519| 5.134865351881966|\n",
      "| stddev|                null|0.34382335723646673|3.3833930336063465|\n",
      "|    min|  CITIMORTGAGE, INC.|               2.75|                -1|\n",
      "|    max|WELLS FARGO BANK,...|              6.125|                34|\n",
      "+-------+--------------------+-------------------+------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df_described = df_lim.describe('servicer_name', 'new_int_rt', 'loan_age')\n",
    "df_described.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": true
   },
   "source": [
    "# Writing to S3\n",
    "\n",
    "And finally, we can write data out to our S3 bucket.  Note that if your data is small enough to be collected onto one computer, writing it is easy.  We'll use the dataframe we just created using `describe` above as an example:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "df_described.write.format('com.databricks.spark.csv').option(\"header\", \"true\").save('s3://ui-spark-social-science-public/data/mycsv')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": true
   },
   "source": [
    "The above line will turn *each partition* of this dataframe into a .csv file.  This is an important note; if your data is very big it may be on a lot of partitions.  This may be required if your data too large to fit in one csv file, but if your data should fit you can include the `coalesce` command, like this:\n",
    "    \n",
    "    df_described.coalesce(1).write.format(\"com.databricks.spark.csv\").option(\"header\", \"true\").save(\"s3://ui-spark-social-science-public/data/mycsv\")\n",
    "\n",
    "To tell it to combine all the data into 1 partition (or however many you pass in as the value).  Again, only do this if your data isn't very large.  See the pySpark tutorial on subsetting for more."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 2",
   "language": "python",
   "name": "python2"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.12"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 1
}
